package com.xzc.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object MysqlOutputTest {
  def main(args: Array[String]): Unit = {
    //1.创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    //2.1 读取文件
    val filePath = "D:\\git\\learning_flink\\_01_试用\\src\\main\\resources\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")

    //3 转换操作
    val sensorTable = tableEnv.from("inputTable")
    //3.1 简单转换
    val resultTable = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")
    //3.2 聚合转换
    val aggTable = sensorTable
      .groupBy('id)
      .select('id, 'id.count as 'count)

    resultTable.toAppendStream[(String, Double)].print("result")
    //聚合的结果不能append
    //false代表之前的对应的数据已失效，后面true才是新的
    aggTable.toRetractStream[(String, Long)].print("agg")

    //4 输出到mysql 可以用ES里面的connect也可以用下面这种MYSQL
    val sinkDDL: String =
      """
        |create table jdbcOutputTable (
        | id varchar(20) not null,
        | cnt bigint not null
        |) with (
        | 'connector.type' = 'jdbc'
        | 'connector.url' = 'jdbc:mysql://localhost:3306/test',
        | 'connector.table' = 'sensor_count',
        | 'connector.driver' = 'com.mysql.jdbc.Driver',
        | 'connector.username' = 'root',
        | 'connector.password' = '123'
        |)
        |""".stripMargin
    tableEnv.sqlUpdate(sinkDDL)
    aggTable.insertInto("jdbcOutputTable")

    env.execute("mysql sink test")
  }

}
